{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Geowave GPX Demo\n",
    "This Demo runs KMeans on the GPX dataset consisting of approximately 285 million point locations. We use a cql filter to reduce the KMeans set to a bounding box over Berlin, Germany. Simply focus a cell and use [SHIFT + ENTER] to run the code."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Import pixiedust\n",
    "Start by importing pixiedust which if all bootstrap and install steps were run correctly.\n",
    "You should see below for opening the pixiedust database successfully with no errors.\n",
    "Depending on the version of pixiedust that gets installed, it may ask you to update.\n",
    "If so, run this first cell."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#!pip install --user --upgrade pixiedust"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pixiedust\n",
    "import geowave_pyspark"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Pixiedust also allows us to monitor spark job progress directly from the notebook. Simply run the cell below and anytime a spark job is run from the notebook you should see incremental progress shown in the output below.\n",
    "*NOTE* If this function fails or produces a error often this is just a link issue between pixiedust and python the first time pixiedust is imported. Restart the Kernel and rerun the cells to fix the error."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pixiedust.enableJobMonitor()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Creating the SQLContext and inspecting pyspark Context\n",
    "Pixiedust imports pyspark and the SparkContext + SparkSession should be already available through the \"sc\" and \"spark\" variables respectively."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Print Spark info and create sql_context\n",
    "print('Spark Version: {0}'.format(sc.version))\n",
    "print('Python Version: {0}'.format(sc.pythonVer))\n",
    "print('Application Name: {0}'.format(sc.appName))\n",
    "print('Application ID: {0}'.format(sc.applicationId))\n",
    "print('Spark Master: {0}'.format( sc.master))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Download and ingest the GPX data\n",
    "*NOTE* Depending on cluster size sometimes the copy can fail. This appears to be a race condition error with the copy command when downloading the files from s3. This may make the following import into acccumulo command fail. You can check the accumulo tables by looking at port 9995 of the emr cluster. There should be 5 tables after importing."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "s3-dist-cp -D mapreduce.task.timeout=60000000 --src=s3://geowave-gpx-data/gpx --dest=hdfs://$HOSTNAME:8020/tmp/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "/opt/accumulo/bin/accumulo shell -u root -p secret -e \"importtable geowave.germany_gpx_SPATIAL_IDX /tmp/spatial\"\n",
    "/opt/accumulo/bin/accumulo shell -u root -p secret -e \"importtable geowave.germany_gpx_GEOWAVE_METADATA /tmp/metadata\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup Datastores"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "# clear out potential old runs\n",
    "geowave store clear kmeans_gpx\n",
    "geowave store rm kmeans_gpx\n",
    "geowave store clear germany_gpx_accumulo\n",
    "geowave store rm germany_gpx_accumulo\n",
    "\n",
    "# configure geowave connection params for name stores \"germany_gpx_accumulo\" and \"kmeans_gpx\"\n",
    "geowave store add germany_gpx_accumulo --gwNamespace geowave.germany_gpx -t accumulo --zookeeper $HOSTNAME:2181 --instance accumulo --user root --password secret\n",
    "geowave store add kmeans_gpx --gwNamespace geowave.kmeans -t accumulo --zookeeper $HOSTNAME:2181 --instance accumulo --user root --password secret"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Run KMeans\n",
    "Run Kmeans on the reduced dataset over Berlin, Germany. Once the spark job begins running you should be able to monitor its progress from the cell with pixiedust, or you can monitor the progress from the spark history server on the emr cluster."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "\n",
    "geowave store clear kmeans_gpx"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Pull core GeoWave datastore classes\n",
    "hbase_options_class = sc._jvm.org.locationtech.geowave.datastore.hbase.cli.config.HBaseRequiredOptions\n",
    "accumulo_options_class = sc._jvm.org.locationtech.geowave.datastore.accumulo.cli.config.AccumuloRequiredOptions\n",
    "query_options_class = sc._jvm.org.locationtech.geowave.core.store.query.QueryOptions\n",
    "byte_array_class = sc._jvm.org.locationtech.geowave.core.index.ByteArrayId\n",
    "# Pull core GeoWave Spark classes from jvm\n",
    "geowave_rdd_class = sc._jvm.org.locationtech.geowave.analytic.spark.GeoWaveRDD\n",
    "rdd_loader_class = sc._jvm.org.locationtech.geowave.analytic.spark.GeoWaveRDDLoader\n",
    "rdd_options_class = sc._jvm.org.locationtech.geowave.analytic.spark.RDDOptions\n",
    "sf_df_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.SimpleFeatureDataFrame\n",
    "kmeans_runner_class = sc._jvm.org.locationtech.geowave.analytic.spark.kmeans.KMeansRunner\n",
    "\n",
    "datastore_utils_class = sc._jvm.org.locationtech.geowave.core.store.util.DataStoreUtils\n",
    "\n",
    "spatial_encoders_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.GeoWaveSpatialEncoders\n",
    "\n",
    "spatial_encoders_class.registerUDTs()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#setup input datastore\n",
    "input_store = accumulo_options_class()\n",
    "input_store.setInstance('accumulo')\n",
    "input_store.setUser('root')\n",
    "input_store.setPassword('secret')\n",
    "input_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')\n",
    "input_store.setGeowaveNamespace('geowave.germany_gpx')\n",
    "\n",
    "#Setup output datastore\n",
    "output_store = accumulo_options_class()\n",
    "output_store.setInstance('accumulo')\n",
    "output_store.setUser('root')\n",
    "output_store.setPassword('secret')\n",
    "output_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')\n",
    "output_store.setGeowaveNamespace('geowave.kmeans')\n",
    "\n",
    "#Create a instance of the runner\n",
    "kmeans_runner = kmeans_runner_class()\n",
    "\n",
    "input_store_plugin = input_store.createPluginOptions()\n",
    "output_store_plugin = output_store.createPluginOptions()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#set the appropriate properties\n",
    "#We want it to execute using the existing JavaSparkContext wrapped by python.\n",
    "kmeans_runner.setSparkSession(sc._jsparkSession)\n",
    "\n",
    "kmeans_runner.setAdapterId('gpxpoint')\n",
    "kmeans_runner.setNumClusters(8)\n",
    "kmeans_runner.setInputDataStore(input_store_plugin)\n",
    "kmeans_runner.setOutputDataStore(output_store_plugin)\n",
    "kmeans_runner.setCqlFilter(\"BBOX(geometry,  13.3, 52.45, 13.5, 52.5)\")\n",
    "kmeans_runner.setCentroidTypeName('mycentroids')\n",
    "kmeans_runner.setHullTypeName('myhulls')\n",
    "kmeans_runner.setGenerateHulls(True)\n",
    "kmeans_runner.setComputeHullData(True)\n",
    "#execute the kmeans runner\n",
    "kmeans_runner.run()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Load Centroids into DataFrame and display"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create the dataframe and get a rdd for the output of kmeans\n",
    "# Grab adapter and setup query options for rdd load\n",
    "adapter_id = byte_array_class('mycentroids')\n",
    "query_adapter = datastore_utils_class.getDataAdapter(output_store_plugin, adapter_id)\n",
    "query_options = query_options_class(query_adapter)\n",
    "\n",
    "# Create RDDOptions for loader\n",
    "rdd_options = rdd_options_class()\n",
    "rdd_options.setQueryOptions(query_options)\n",
    "output_rdd = rdd_loader_class.loadRDD(sc._jsc.sc(), output_store_plugin, rdd_options)\n",
    "\n",
    "# Create a SimpleFeatureDataFrame from the GeoWaveRDD\n",
    "sf_df = sf_df_class(spark._jsparkSession)\n",
    "sf_df.init(output_store_plugin, adapter_id)\n",
    "df = sf_df.getDataFrame(output_rdd)\n",
    "\n",
    "# Convert Java DataFrame to Python DataFrame\n",
    "import pyspark.mllib.common as convert\n",
    "py_df = convert._java2py(sc, df)\n",
    "\n",
    "py_df.createOrReplaceTempView('mycentroids')\n",
    "\n",
    "df = spark.sql(\"select * from mycentroids\")\n",
    "\n",
    "display(df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Parse DataFrame data into lat/lon columns and display centroids on map\n",
    "Using pixiedust's built in map visualization we can display data on a map assuming it has the following properties.\n",
    "- Keys: put your latitude and longitude fields here. They must be floating values. These fields must be named latitude, lat or y and longitude, lon or x.\n",
    "- Values: the field you want to use to thematically color the map. Only one field can be used.\n",
    "\n",
    "Also you will need a access token from whichever map renderer you choose to use with pixiedust (mapbox, google).\n",
    "Follow the instructions in the token help on how to create and use the access token."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Convert the string point information into lat long columns and create a new dataframe for those.\n",
    "import pyspark\n",
    "def parseRow(row):\n",
    "    lat=row.geom.y\n",
    "    lon=row.geom.x\n",
    "    return pyspark.sql.Row(lat=lat,lon=lon,ClusterIndex=row.ClusterIndex)\n",
    "    \n",
    "row_rdd = df.rdd\n",
    "\n",
    "new_rdd = row_rdd.map(lambda row: parseRow(row))\n",
    "\n",
    "new_df = new_rdd.toDF()\n",
    "display(new_df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Export KMeans Hulls to DataFrame\n",
    "If you have some more complex data to visualize pixiedust may not be the best option.\n",
    "\n",
    "The Kmeans hull generation outputs polygons that would be difficult for pixiedust to display without\n",
    "creating a special plugin. \n",
    "\n",
    "Instead, we can use another map renderer to visualize our data. For the Kmeans hulls we will use folium to visualize the data. Folium allows us to easily add wms layers to our notebook, and we can combine that with GeoWaves geoserver functionality to render the hulls and centroids. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create the dataframe and get a rdd for the output of kmeans\n",
    "# Grab adapter and setup query options for rdd load\n",
    "adapter_id = byte_array_class('myhulls')\n",
    "query_adapter = datastore_utils_class.getDataAdapter(output_store_plugin, adapter_id)\n",
    "query_options = query_options_class(query_adapter)\n",
    "\n",
    "# Use GeoWaveRDDLoader to load an RDD\n",
    "rdd_options = rdd_options_class()\n",
    "rdd_options.setQueryOptions(query_options)\n",
    "output_rdd_hulls = rdd_loader_class.loadRDD(sc._jsc.sc(), output_store_plugin, rdd_options)\n",
    "\n",
    "\n",
    "# Create a SimpleFeatureDataFrame from the GeoWaveRDD\n",
    "sf_df_hulls = sf_df_class(spark._jsparkSession)\n",
    "sf_df_hulls.init(output_store_plugin, adapter_id)\n",
    "df_hulls = sf_df_hulls.getDataFrame(output_rdd_hulls)\n",
    "\n",
    "# Convert Java DataFrame to Python DataFrame\n",
    "import pyspark.mllib.common as convert\n",
    "py_df_hulls = convert._java2py(sc, df_hulls)\n",
    "\n",
    "# Create a sql table view of the hulls data\n",
    "py_df_hulls.createOrReplaceTempView('myhulls')\n",
    "\n",
    "# Run SQL Query on Hulls data\n",
    "df_hulls = spark.sql(\"select * from myhulls order by Density\")\n",
    "\n",
    "display(df_hulls)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Visualize results using geoserver and wms\n",
    "folium provides an easy way to visualize leaflet maps in jupyter notebooks. When the data is too complicated or big to work within the simple framework pixiedust provides for map display we can instead turn to geoserver and wms to render our layers. First we configure geoserver then setup wms layers for folium to display the kmeans results on the map."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "# set up geoserver\n",
    "geowave config geoserver \"$HOSTNAME:8000\"\n",
    "\n",
    "# add the centroids layer\n",
    "geowave gs layer add kmeans_gpx -id mycentroids\n",
    "geowave gs style set mycentroids --styleName point\n",
    "\n",
    "# add the hulls layer\n",
    "geowave gs layer add kmeans_gpx -id myhulls\n",
    "geowave gs style set myhulls --styleName line"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import owslib\n",
    "from owslib.wms import WebMapService\n",
    "\n",
    "url = \"http://\" + os.environ['HOSTNAME'] + \":8000/geoserver/geowave/wms\"\n",
    "web_map_services = WebMapService(url)\n",
    "\n",
    "#print layers available wms\n",
    "print('\\n'.join(web_map_services.contents.keys()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import folium\n",
    "#grab wms info for centroids\n",
    "layer = 'mycentroids'\n",
    "wms = web_map_services.contents[layer]\n",
    "\n",
    "#build center of map off centroid bbox\n",
    "lon = (wms.boundingBox[0] + wms.boundingBox[2]) / 2.\n",
    "lat = (wms.boundingBox[1] + wms.boundingBox[3]) / 2.\n",
    "center = [lat, lon]\n",
    "\n",
    "m = folium.Map(location = center,zoom_start=10)\n",
    "\n",
    "\n",
    "name = wms.title\n",
    "centroids = folium.raster_layers.WmsTileLayer(\n",
    "    url=url,\n",
    "    name=name,\n",
    "    fmt='image/png',\n",
    "    transparent=True,\n",
    "    layers=layer,\n",
    "    overlay=True,\n",
    "    COLORSCALERANGE='1.2,28',\n",
    ")\n",
    "centroids.add_to(m)\n",
    "\n",
    "layer = 'myhulls'\n",
    "wms = web_map_services.contents[layer]\n",
    "\n",
    "name = wms.title\n",
    "hulls = folium.raster_layers.WmsTileLayer(\n",
    "    url=url,\n",
    "    name=name,\n",
    "    fmt='image/png',\n",
    "    transparent=True,\n",
    "    layers=layer,\n",
    "    overlay=True,\n",
    "    COLORSCALERANGE='1.2,28',\n",
    ")\n",
    "hulls.add_to(m)\n",
    "m"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    ""
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python with Pixiedust (Spark 2.3)",
   "language": "python",
   "name": "pythonwithpixiedustspark23"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3.0
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}